// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Apache Kafka Streamer

== Overview

Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
Either of the following two methods can be used to achieve such streaming:

* using Kafka Connect functionality with Ignite sink
* importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming

== Streaming Data via Kafka Connect

`IgniteSinkConnector` will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing
it to your specified cache. The connector can be found in the `optional/ignite-kafka` module. It and its dependencies
have to be on the classpath of a Kafka running instance, as described in the following subsection. _For more information
on Kafka Connect, see http://kafka.apache.org/documentation.html#connect[Kafka Documentation, window=_blank]._

=== Setting up and Running

. Add the `IGNITE_HOME/libs/optional/ignite-kafka` module to the application classpath.

. Prepare worker configurations, e.g.,
+
[tabs]
--
tab:Configuration[]
[source,yaml]
----
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
----
--

. Prepare connector configurations, e.g.,
+
[tabs]
--
tab:Configuration[]
[source,yaml]
----
# connector
name=my-ignite-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
tasks.max=2
topics=someTopic1,someTopic2

# cache
cacheName=myCache
cacheAllowOverwrite=true
igniteCfg=/some-path/ignite.xml
singleTupleExtractorCls=my.company.MyExtractor
----
--
+
* where `cacheName` is the name of the cache you specify in `/some-path/ignite.xml` and the data from `someTopic1,someTopic2`
will be pulled and stored.
* `cacheAllowOverwrite` can be set to `true` if you want to enable overwriting existing values in the cache.
* If you need to parse the incoming data and decide on your new key and value, you can implement it as `StreamSingleTupleExtractor` and specify as `singleTupleExtractorCls`.
* You can also set `cachePerNodeDataSize` and `cachePerNodeParOps` to adjust per-node buffer and the maximum number of parallel stream operations for a single node.

. Start connector, for instance, in a standalone mode as follows,
+
[tabs]
--
tab:Shell[]
[source,shell]
----
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
----
--

=== Checking the Flow

To perform a very basic functionality check, you can do the following,

. Start Zookeeper
+
[tabs]
--
tab:Shell[]
[source,shell]
----
bin/zookeeper-server-start.sh config/zookeeper.properties
----
--
. Start Kafka server
+
[tabs]
--
tab:Shell[]
[source,shell]
----
bin/kafka-server-start.sh config/server.properties
----
--
. Provide some data input to the Kafka server
+
[tabs]
--
tab:Shell[]
[source,shell]
----
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1
----
--
. Start the connector
+
[tabs]
--
tab:Shell[]
[source,shell]
----
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
----
--
. Check the value is in the cache. For example, via REST API,
+
[tabs]
--
tab:Shell[]
[source,shell]
----
http://node1:8080/ignite?cmd=size&cacheName=cache1
----
--

== Streaming data with Ignite Kafka Streamer Module

If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module
dependency like this (replace `${ignite-kafka-ext.version}` with actual Ignite Kafka Extension version you are interested in):

[tabs]
--
tab:pom.xml[]
[source,xml]
----
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-kafka-ext</artifactId>
            <version>${ignite-kafka-ext.version}</version>
        </dependency>
        ...
    </dependencies>
    ...
</project>
----
--

Having a cache with `String` keys and `String` values, the streamer can be started as follows
[tabs]
--
tab:Java[]
[source,java]
----
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"));

// allow overwriting cache data
stmr.allowOverwrite(true);

kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);

// set the topic
kafkaStreamer.setTopic(someKafkaTopic);

// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);

// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);

// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);

kafkaStreamer.start();

...

// stop on shutdown
kafkaStreamer.stop();

strm.close();
----
--

For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html
